#YashanDB Spark Connector
YashanDB Spark Connector可以支持通过Spark读取YashanDB中存储的数据,也支持通过Spark写入数据到YashanDB。
# 版本配套说明
Connector Version | Spark Version | YashanDB Version | Java Version | Scala Version |
---|---|---|---|---|
1.1.0 | 3.2~3.5 | 23.2以及以上 | 1.8 | 2.12,2.11 |
# 部署连接器
安装spark,具体操作请查阅Spark官网文档 (opens new window)。
安装YashanDB连接器:
1). 将YashanDB Spark Connector组件和yasdb-jdbc驱动的jar包复制到Spark安装路径下的jars目录中。YashanDB Spark Connector组件和yasdb-jdbc驱动的jar包可联系我们的技术支持获取。
2). 重启spark。
# 功能介绍
# 使用样例(读取相关)
# SQL
create TEMPORARY VIEW spark_yashandb USING yashandb OPTIONS(
"url"="jdbc:yasdb://192.168.4.20:3688/test",
"dbtable"="DS_AST_RET_TRAN_CUST_PROD_AGENT200",
"user"="SPARKTEST",
"password"="123456"
);
select * from spark_yashandb;
Copied!
# DataFrame
val yasdata = spark.read.format("yashandb")
.option("url","jdbc:yasdb://192.168.4.20:3688/test")
.option("user","SPARKTEST")
.option("password","123456")
.option("dbtable","DS_AST_RET_TRAN_CUST_PROD_AGENT200")
.option("fetchsize",10240)
.load();
yasdata.show(5)
Copied!
# 参数说明
参数名 | 默认值 | 参数说明 | 参数生效范围 |
---|---|---|---|
url | (none) | YashanDB数据库的JDBC URL | 读取数据、写入数据 |
dbtable | (none) | 待读取或写入的JDBC表。在读取路径中使用该参数时,可以配置为SQL查询的FROM子句中有效的任何内容,例如使用括号中的子查询来代替完整的表 不允许同时指定数据库表和查询选项 | 读取数据、写入数据 |
user | (none) | 连接用户名 | 读取数据、写入数据 |
password | (none) | 连接密码 | 读取数据、写入数据 |
batchsize | 1000 | JDBC批处理大小,该参数值也将决定每次往返写入数据的行数 | 写入数据 |
query | (none) | 用于将数据读取到Spark中的查询语句,该参数指定的查询语句将用括号包围并用作FROM子句中的子查询,Spark将为该子查询分配一个别名。 * 不允许同时指定数据库表和查询选项 * 不允许同时指定query和partitionColumn参数。如仍需指定partitionColumn参数,可以使用dbtable参数指定子查询并用子查询别名限定相应分区列 | 读取数据、写入数据 |
writeMode | INSERT | YashanDB的写入模式,支持INSERT、BULKINSERT或BULKUPSERT。INSERT为普通写入模式,BULKINSERT和BULKUPSERT为bulkload模式,使用BULKLOAD模式的详细介绍请查阅hint | 写入数据 |
partitionColumn, lowerBound, upperBound | (none) | 若需并行查询分区列,则必须指定partitionColumn(指定分区列)、lowerBound(分区下限值)、upperBound(分区上限值)以及numPartitions(可并行的最大分区数) * partitionColumn必须指定为相关表中的数字、日期或时间戳列 * lowerBound和upperBound用于决定分区步长,并非筛选表中的行(表中的所有行都将被分区并返回) | 读取数据 |
numPartitions | (none) | 在表读取和写入过程中可用于并行的最大分区数,该参数值也将决定并发JDBC连接的最大数量。若待写入的分区数超过该参数值,会在写入前调用coalize(numPartitions)减少待写入分区数 | 读取数据 |
queryTimeout | 0 | 驱动程序等待Statement对象执行的时间(单位:秒),0表示没限等待 | 读取数据、写入数据 |
fetchsize | 1000 | JDBC获取大小,该参数值也将决定每次往返读取数据的行数 | 读取数据 |
truncate | false | 是否使用truncate table代替drop table操作 | 写入数据 |
enablePartitionQuery | true | 是否开启分区查询,开启后连接器会将数据库中分区表的每个子分区视作一个分区进行分区查询,分区查询只对分区表生效。单并行模式下,建议关闭分区查询 | 读取数据 |
enableRouteQuery | false | 是否开启route排序查询,开启后连接器会根据route排序分区。如需开启,请确保YashanDB为分布式部署且连接用户具备访问route$系统表的权限 | 读取数据 |
readerBufferSize | 1024 | connector读的缓存队列的大小 | 读取数据 |
read.fields | (none) | 需读取数据的表的字段清单,多个字段名间用逗号, 分隔 | 读取数据 |
write.fields | (none) | 需写入数据的表的字段或字段顺序,多个字段名间用逗号, 分隔。默认写入时需按照表字段顺序写入全部字段 | 写入数据 |
filterQuery | (none) | 过滤读取数据的表达式,此表达式拼接在查询语句的where子句后,查询时使用此表达式完成源端数据过滤 | 读取数据 |
pushDownPredicate | true | 是否开启谓词下推到JDBC数据源。开启时Spark将尽可能向下推送YashanDB数据源的过滤器,否则不会向YashanDB数据源下推任何筛选器(所有筛选器都将由Spark处理)。当Spark执行谓词筛选的速度高于YashanDB数据源时,建议关闭谓词下推 | 读取数据 |
enableJdbcWrite | false | 是否开启JDBC写入模式,关闭时使用FastLoad写入模式 | 写入数据 |
JDBC写入模式参数如下,仅enableJdbcWrite=true时生效。
参数名 | 默认值 | 参数说明 | 参数生效范围 |
---|---|---|---|
parallel.binder | 3 | binder线程的线程数,如果Binder线程成为瓶颈,可以适当调大此参数。 | 写入数据 |
batchesPerTxn | 3 | 每个事务中的数据批数,导入为可变数据时,不建议调大此值;导入为稳态数据时,建议将此值调大至百级或千级。 | 写入数据 |
FastLoad写入模式参数如下,仅enableJdbcWrite=false时生效。
参数名 | 默认值 | 参数说明 | 参数生效范围 |
---|---|---|---|
fastload.sendCountAtOnce | 10000 | FastLoad每次发送给服务端的行数 | 写入数据 |
fastload.maxWaitLineCount | 10000*(CPU核心数*2+1) | FastLoad缓冲队列中最大行数 | 写入数据 |
fastload.commitCount | 20000 | 提交行数,为0则表示不中途提交 | 写入数据 |
fastload.closeTimeout | 3000ms | FastLoad关闭的超时等待时间 | 写入数据 |
fastload.senderCount | 所在机器的CPU核心数 | FastLoad的并行sender的数量 | 写入数据 |
fastload.maxReaderCount | 所在机器的CPU核心数/2 | FastLoad的并行reader的数量 | 写入数据 |
fastload.putDataWaitTime | 10ms | writer线程向FastLoad的缓冲队列发送数据的超时等待时间 | 写入数据 |
# 数据类型映射
YashanDB Type | Spark Type |
---|---|
Boolean | BooleanType |
TINYINT | IntegerType |
SMALLINT | IntegerType |
INTEGER | IntegerType |
BIGINT | LongType |
DOUBLE | DoubleType |
FLOAT | FloatType |
REAL | DoubleType |
NUMBER | createDecimalType() |
NUMERIC | createDecimalType() |
DECIMAL | createDecimalType() |
DATE | DateType |
TIME | TimestampType |
TIMESTAMP | TimestampType |
YM_INTERVAL | createYearMonthIntervalType() |
DS_INTERVAL | createDayTimeIntervalType() |
CHAR | StringType |
NCHAR | StringType |
VARCHAR | StringType |
NVARCHAR | StringType |
CLOB | StringType |
NCLOB | StringType |
RAW | StringType |
JSON | StringType |
BLOB | BinaryType |
BIT | BinaryType |